/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.core;



import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.metrics.Counter;

import com.bff.gaia.unified.sdk.metrics.Metrics;

import com.bff.gaia.unified.sdk.state.State;

import com.bff.gaia.unified.sdk.state.StateSpec;

import com.bff.gaia.unified.sdk.state.TimeDomain;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnSignature;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnSignatures;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.NonMergingWindowFn;

import com.bff.gaia.unified.sdk.transforms.windowing.WindowFn;

import com.bff.gaia.unified.sdk.util.WindowTracing;

import com.bff.gaia.unified.sdk.util.WindowedValue;

import com.bff.gaia.unified.sdk.values.WindowingStrategy;

import org.joda.time.Instant;



import java.util.Map;



/**

 * A customized {@link DoFnRunner} that handles late data dropping and garbage collection for

 * stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}

 * and does cleanup in {@link #onTimer(String, BoundedWindow, Instant, TimeDomain)}

 *

 * @param <InputT> the type of the {@link DoFn} (main) input elements

 * @param <OutputT> the type of the {@link DoFn} (main) output elements

 */

public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>

    implements DoFnRunner<InputT, OutputT> {



  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";



  private final DoFnRunner<InputT, OutputT> doFnRunner;

  private final WindowingStrategy<?, ?> windowingStrategy;

  private final Counter droppedDueToLateness =

      Metrics.counter(StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);

  private final CleanupTimer<InputT> cleanupTimer;

  private final StateCleaner stateCleaner;



  public StatefulDoFnRunner(

      DoFnRunner<InputT, OutputT> doFnRunner,

      WindowingStrategy<?, ?> windowingStrategy,

      CleanupTimer<InputT> cleanupTimer,

      StateCleaner<W> stateCleaner) {

    this.doFnRunner = doFnRunner;

    this.windowingStrategy = windowingStrategy;

    this.cleanupTimer = cleanupTimer;

    this.stateCleaner = stateCleaner;

    WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();

    rejectMergingWindowFn(windowFn);

  }



  private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {

    if (!(windowFn instanceof NonMergingWindowFn)) {

      throw new UnsupportedOperationException(

          "MergingWindowFn is not supported for stateful DoFns, WindowFn is: " + windowFn);

    }

  }



  @Override

  public DoFn<InputT, OutputT> getFn() {

    return doFnRunner.getFn();

  }



  @Override

  public void startBundle() {

    doFnRunner.startBundle();

  }



  @Override

  public void processElement(WindowedValue<InputT> input) {



    // StatefulDoFnRunner always observes windows, so we need to explode

    for (WindowedValue<InputT> value : input.explodeWindows()) {



      BoundedWindow window = value.getWindows().iterator().next();



      if (isLate(window)) {

        // The element is too late for this window.

        droppedDueToLateness.inc();

        WindowTracing.debug(

            "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "

                + "since too far behind inputWatermark:{}",

            input.getTimestamp(),

            window,

            cleanupTimer.currentInputWatermarkTime());

      } else {

        cleanupTimer.setForWindow(value.getValue(), window);

        doFnRunner.processElement(value);

      }

    }

  }



  private boolean isLate(BoundedWindow window) {

    Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);

    Instant inputWM = cleanupTimer.currentInputWatermarkTime();

    return gcTime.isBefore(inputWM);

  }



  @Override

  public void onTimer(

      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {

    if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {

      stateCleaner.clearForWindow(window);

      // There should invoke the onWindowExpiration of DoFn

    } else {

      // An event-time timer can never be late because we don't allow setting timers after GC time.

      // Ot can happen that a processing-time time fires for a late window, we need to ignore

      // this.

      if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {

        // don't increment the dropped counter, only do that for elements

        WindowTracing.debug(

            "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "

                + "since window is too far behind inputWatermark:{}",

            timestamp,

            window,

            cleanupTimer.currentInputWatermarkTime());

      } else {

        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);

      }

    }

  }



  @Override

  public void finishBundle() {

    doFnRunner.finishBundle();

  }



  /**

   * A cleaner for deciding when to clean state of window.

   *

   * <p>A runner might either (a) already know that it always has a timer set for the expiration

   * time or (b) not need a timer at all because it is a batch runner that discards state when it is

   * done.

   */

  public interface CleanupTimer<InputT> {



    /**

     * Return the current, local input watermark timestamp for this computation in the {@link

     * TimeDomain#EVENT_TIME} time domain.

     */

    Instant currentInputWatermarkTime();



    /** Set the garbage collect time of the window to timer. */

    void setForWindow(InputT value, BoundedWindow window);



    /** Checks whether the given timer is a cleanup timer for the window. */

    boolean isForWindow(

		String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);

  }



  /** A cleaner to clean all states of the window. */

  public interface StateCleaner<W extends BoundedWindow> {



    void clearForWindow(W window);

  }



  /** A {@link StatefulDoFnRunner.CleanupTimer} implemented via {@link TimerInternals}. */

  public static class TimeInternalsCleanupTimer<InputT>

      implements StatefulDoFnRunner.CleanupTimer<InputT> {



    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";



    /**

     * The amount of milliseconds by which to delay cleanup. We use this to ensure that state is

     * still available when a user timer for {@code window.maxTimestamp()} fires.

     */

    public static final long GC_DELAY_MS = 1;



    private final TimerInternals timerInternals;

    private final WindowingStrategy<?, ?> windowingStrategy;

    private final Coder<BoundedWindow> windowCoder;



    public TimeInternalsCleanupTimer(

        TimerInternals timerInternals, WindowingStrategy<?, ?> windowingStrategy) {

      this.windowingStrategy = windowingStrategy;

      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();

      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();

      this.timerInternals = timerInternals;

    }



    @Override

    public Instant currentInputWatermarkTime() {

      return timerInternals.currentInputWatermarkTime();

    }



    @Override

    public void setForWindow(InputT input, BoundedWindow window) {

      Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);

      // make sure this fires after any window.maxTimestamp() timers

      gcTime = gcTime.plus(GC_DELAY_MS);

      timerInternals.setTimer(

          StateNamespaces.window(windowCoder, window), GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);

    }



    @Override

    public boolean isForWindow(

        String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {

      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);

      Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);

      gcTime = gcTime.plus(GC_DELAY_MS);

      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);

    }

  }



  /** A {@link StatefulDoFnRunner.StateCleaner} implemented via {@link StateInternals}. */

  public static class StateInternalsStateCleaner<W extends BoundedWindow>

      implements StatefulDoFnRunner.StateCleaner<W> {



    private final DoFn<?, ?> fn;

    private final DoFnSignature signature;

    private final StateInternals stateInternals;

    private final Coder<W> windowCoder;



    public StateInternalsStateCleaner(

		DoFn<?, ?> fn, StateInternals stateInternals, Coder<W> windowCoder) {

      this.fn = fn;

      this.signature = DoFnSignatures.getSignature(fn.getClass());

      this.stateInternals = stateInternals;

      this.windowCoder = windowCoder;

    }



    @Override

    public void clearForWindow(W window) {

      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :

          signature.stateDeclarations().entrySet()) {

        try {

          StateSpec<?> spec = (StateSpec<?>) entry.getValue().field().get(fn);

          State state =

              stateInternals.state(

                  StateNamespaces.window(windowCoder, window),

                  StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));

          state.clear();

        } catch (IllegalAccessException e) {

          throw new RuntimeException(e);

        }

      }

    }

  }

}